package com.tony.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


/**
 * mqtt订阅消息处理
 *
 * @author tony
 * @date 2021-9-15
 */
@Slf4j
@Component
public class MqttMessageReceiver implements MessageHandler {

    private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 16, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        threadPoolExecutor.execute(() -> doMessage(message));
    }

    /**
     * 消息监听处理
     *
     * @param message
     */
    private void doMessage(Message<?> message) {

        String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
        String payload = String.valueOf(message.getPayload());
        log.info("接收到消息,topic,{},内容:{}", topic, payload);
        // todo 处理接收到的消息
    }

}
